iT邦幫忙

2025 iThome 鐵人賽

DAY 29
0

前面我們打造了功能完整的 AI 助手系統,但系統上線後你會面臨靈魂三問:

  • ❓ 為什麼突然變慢了?
  • ❓ 這個月 Gemini API 費用為什麼這麼高?
  • ❓ 用戶說有 Bug,但我找不到日誌在哪裡?

這篇要建立 生產級可觀測性體系,讓你對系統瞭如指掌,第一時間發現問題、定位問題、解決問題。

目標:從「系統跑起來了」到「系統跑得好、看得清、省得多」

1) 可觀測性架構:三支柱 + 成本閉環

flowchart TB
    subgraph "用戶層"
        USER[用戶請求]
        WEBAPP[Web 應用]
    end

    subgraph "應用層"
        CHATSERVICE[Chat Service]
        MEMSERVICE[Memory Service]
        TOOLEXEC[Tool Executor]
    end

    subgraph "AI 層"
        GEMINI[Gemini API]
        DISCOVERY[Discovery Engine]
    end

    subgraph "可觀測性三支柱"
        direction LR
        LOGS[📝 結構化日誌<br/>Cloud Logging]
        METRICS[📊 指標監控<br/>Cloud Monitoring]
        TRACES[🔍 分散式追蹤<br/>Cloud Trace]
    end

    subgraph "分析與行動"
        DASHBOARD[📈 即時儀表板]
        ALERT[🚨 智慧告警]
        BILLING[💰 真實成本<br/>Billing Export]
        OPTIMIZE[⚡ 自動優化]
    end

    USER --> WEBAPP --> CHATSERVICE
    CHATSERVICE --> GEMINI
    CHATSERVICE --> DISCOVERY
    CHATSERVICE --> TOOLEXEC

    CHATSERVICE -.日誌.-> LOGS
    CHATSERVICE -.指標.-> METRICS
    CHATSERVICE -.追蹤.-> TRACES

    GEMINI -.API 用量.-> METRICS
    DISCOVERY -.檢索效果.-> METRICS

    LOGS --> DASHBOARD
    METRICS --> DASHBOARD
    TRACES --> DASHBOARD

    METRICS --> ALERT
    LOGS --> ALERT

    BILLING --> ANALYSIS
    ANALYSIS --> OPTIMIZE
    OPTIMIZE -.調整.-> CHATSERVICE

核心設計原則

支柱 作用 實現技術 關鍵指標
📝 Logs What happened? Cloud Logging + jsonPayload 錯誤率、異常堆疊
📊 Metrics How much? Cloud Monitoring + DELTA/GAUGE 延遲、QPS、Token 用量
🔍 Traces Where's bottleneck? Cloud Trace + OpenTelemetry 端到端延遲、依賴關係

2) 結構化日誌系統:正確輸出到 jsonPayload

統一日誌格式

# shared/logging_config.py
import logging
import traceback
from datetime import datetime
from typing import Any, Dict, Optional
from contextvars import ContextVar
import hashlib

from google.cloud import logging as cloud_logging
from google.cloud.logging_v2.handlers import StructuredLogHandler

# 使用 ContextVar 追蹤請求上下文
request_id_var: ContextVar[str] = ContextVar('request_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
chat_id_var: ContextVar[str] = ContextVar('chat_id', default='')
trace_id_var: ContextVar[str] = ContextVar('trace_id', default='')

class StructuredLogger:
    """生產級結構化日誌"""

    def __init__(self, name: str, project_id: str):
        self.logger = logging.getLogger(name)
        self.project_id = project_id
        self._configure_cloud_logging()

    def _configure_cloud_logging(self):
        """配置 GCP Cloud Logging"""
        try:
            # 使用 StructuredLogHandler 確保輸出到 jsonPayload
            handler = StructuredLogHandler()
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
        except Exception:
            # 降級到標準輸出
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            ))
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    def _build_log_entry(
        self,
        message: str,
        level: str,
        extra: Optional[Dict[str, Any]] = None,
        error: Optional[Exception] = None
    ) -> Dict[str, Any]:
        """建立結構化日誌條目"""

        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "severity": level,
            "message": message,
            "service": "ai-assistant",
            "request_id": request_id_var.get(),
            "user_id": user_id_var.get(),
            "chat_id": chat_id_var.get(),
            "logging.googleapis.com/trace": f"projects/{self.project_id}/traces/{trace_id_var.get()}",
            **(extra or {})
        }

        if error:
            log_entry["error"] = {
                "type": type(error).__name__,
                "message": str(error),
                "stacktrace": traceback.format_exc()
            }

        return {k: v for k, v in log_entry.items() if v}

    def _mask_sensitive_data(self, text: str) -> Dict[str, Any]:
        """遮蔽敏感資料"""
        return {
            "content_hash": hashlib.sha256(text.encode()).hexdigest()[:16],
            "content_length": len(text),
        }

    def info(self, message: str, **kwargs):
        """資訊日誌"""
        entry = self._build_log_entry(message, "INFO", kwargs)
        self.logger.info(entry)

    def warning(self, message: str, **kwargs):
        """警告日誌"""
        entry = self._build_log_entry(message, "WARNING", kwargs)
        self.logger.warning(entry)

    def error(self, message: str, error: Optional[Exception] = None, **kwargs):
        """錯誤日誌"""
        entry = self._build_log_entry(message, "ERROR", kwargs, error)
        self.logger.error(entry)

    def critical(self, message: str, error: Optional[Exception] = None, **kwargs):
        """嚴重錯誤日誌"""
        entry = self._build_log_entry(message, "CRITICAL", kwargs, error)
        self.logger.critical(entry)

    def user_action(self, action: str, **kwargs):
        """用戶行為日誌"""
        safe_kwargs = {k: v for k, v in kwargs.items()
                      if k not in ['message_text', 'user_email', 'phone']}
        self.info(
            f"User action: {action}",
            log_type="user_action",
            action=action,
            **safe_kwargs
        )

    def ai_interaction(
        self,
        interaction_type: str,
        model: str,
        tokens_used: int = 0,
        latency_ms: int = 0,
        **kwargs
    ):
        """AI 互動日誌"""
        self.info(
            f"AI interaction: {interaction_type}",
            log_type="ai_interaction",
            interaction_type=interaction_type,
            model=model,
            tokens_used=tokens_used,
            latency_ms=latency_ms,
            **kwargs
        )

    def tool_execution(
        self,
        tool_name: str,
        status: str,
        execution_time_ms: int = 0,
        **kwargs
    ):
        """工具執行日誌"""
        safe_kwargs = kwargs.copy()
        if 'parameters' in safe_kwargs:
            safe_kwargs['parameters_hash'] = hashlib.sha256(
                str(safe_kwargs['parameters']).encode()
            ).hexdigest()[:16]
            safe_kwargs.pop('parameters')

        self.info(
            f"Tool execution: {tool_name} - {status}",
            log_type="tool_execution",
            tool_name=tool_name,
            status=status,
            execution_time_ms=execution_time_ms,
            **safe_kwargs
        )

    def cost_tracking(
        self,
        service: str,
        operation: str,
        estimated_cost_usd: float,
        **kwargs
    ):
        """成本追蹤日誌(僅為估算)"""
        self.info(
            f"Cost tracking: {service} - {operation}",
            log_type="cost_tracking",
            service=service,
            operation=operation,
            estimated_cost_usd=estimated_cost_usd,
            is_estimate=True,
            **kwargs
        )

_logger: Optional[StructuredLogger] = None

def get_logger(project_id: str) -> StructuredLogger:
    """獲取全局日誌實例"""
    global _logger
    if _logger is None:
        _logger = StructuredLogger("ai-assistant", project_id)
    return _logger

def set_request_context(
    request_id: str,
    user_id: str = "",
    chat_id: str = "",
    trace_id: str = ""
):
    """設定請求上下文"""
    request_id_var.set(request_id)
    user_id_var.set(user_id)
    chat_id_var.set(chat_id)
    trace_id_var.set(trace_id)

def clear_request_context():
    """清除請求上下文"""
    request_id_var.set("")
    user_id_var.set("")
    chat_id_var.set("")
    trace_id_var.set("")

FastAPI 中間件:自動追蹤請求

# services/chat/app/middleware.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import time
import uuid
import jwt

from shared.logging_config import get_logger, set_request_context, clear_request_context

class ObservabilityMiddleware(BaseHTTPMiddleware):
    """可觀測性中間件"""

    def __init__(self, app, project_id: str):
        super().__init__(app)
        self.logger = get_logger(project_id)
        self.project_id = project_id

    def _extract_user_id(self, request: Request) -> str:
        """從 JWT 或 OIDC header 提取 user_id"""

        # 1. API Gateway / Cloud Run OIDC header
        if "X-Apigateway-Api-Userinfo" in request.headers:
            try:
                import base64
                import json
                userinfo = json.loads(
                    base64.b64decode(request.headers["X-Apigateway-Api-Userinfo"])
                )
                return userinfo.get("sub", "anonymous")
            except:
                pass

        # 2. Authorization Bearer token
        auth_header = request.headers.get("Authorization", "")
        if auth_header.startswith("Bearer "):
            try:
                token = auth_header.split(" ")[1]
                decoded = jwt.decode(
                    token,
                    options={"verify_signature": False}
                )
                return decoded.get("sub") or decoded.get("user_id", "anonymous")
            except:
                pass

        # 3. 降級方案
        return request.headers.get("X-User-ID", "anonymous")

    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
        trace_id = request.headers.get("X-Cloud-Trace-Context", "").split("/")[0] or str(uuid.uuid4())
        user_id = self._extract_user_id(request)

        set_request_context(request_id, user_id, trace_id=trace_id)

        start_time = time.time()

        self.logger.info(
            "Request started",
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host,
            user_agent=request.headers.get("user-agent", "")[:200]
        )

        try:
            response = await call_next(request)
            processing_time_ms = int((time.time() - start_time) * 1000)

            self.logger.info(
                "Request completed",
                method=request.method,
                path=request.url.path,
                status_code=response.status_code,
                processing_time_ms=processing_time_ms
            )

            response.headers["X-Request-ID"] = request_id
            response.headers["X-Processing-Time-Ms"] = str(processing_time_ms)

            return response

        except Exception as e:
            processing_time_ms = int((time.time() - start_time) * 1000)
            self.logger.error(
                "Request failed",
                error=e,
                method=request.method,
                path=request.url.path,
                processing_time_ms=processing_time_ms
            )
            raise

        finally:
            clear_request_context()


3) 自定義指標系統:正確使用 DELTA 和 GAUGE

Metrics Client 實作

# shared/metrics_client.py
from google.cloud import monitoring_v3
from google.api import metric_pb2 as ga_metric
from google.api import label_pb2 as ga_label
import time
import os
from typing import Dict, Any, Optional

class MetricsClient:
    """自定義指標客戶端"""

    def __init__(self, project_id: str):
        self.project_id = project_id
        self.project_name = f"projects/{project_id}"
        self.client = monitoring_v3.MetricServiceClient()
        self.resource_type, self.resource_labels = self._detect_environment()
        self._ensure_metric_descriptors()

    def _detect_environment(self) -> tuple:
        """檢測執行環境"""
        if os.getenv("K_SERVICE"):
            return "cloud_run_revision", {
                "project_id": self.project_id,
                "service_name": os.getenv("K_SERVICE", "unknown"),
                "revision_name": os.getenv("K_REVISION", "unknown"),
                "location": os.getenv("CLOUD_RUN_LOCATION", "asia-east1")
            }
        elif os.getenv("KUBERNETES_SERVICE_HOST"):
            return "k8s_container", {
                "project_id": self.project_id,
                "location": os.getenv("GKE_LOCATION", "us-central1"),
                "cluster_name": os.getenv("GKE_CLUSTER", "default"),
                "namespace_name": os.getenv("K8S_NAMESPACE", "default"),
                "pod_name": os.getenv("HOSTNAME", "unknown"),
                "container_name": "app"
            }
        else:
            return "global", {"project_id": self.project_id}

    def _ensure_metric_descriptors(self):
        """確保指標描述符存在"""

        descriptors = [
            {
                "type": "custom.googleapis.com/ai_assistant/interaction_latency",
                "metric_kind": ga_metric.MetricDescriptor.MetricKind.GAUGE,
                "value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
                "description": "AI interaction latency in milliseconds",
                "display_name": "AI Interaction Latency",
                "unit": "ms",
                "labels": [
                    ga_label.LabelDescriptor(
                        key="model",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    ),
                    ga_label.LabelDescriptor(
                        key="interaction_type",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    )
                ]
            },
            {
                "type": "custom.googleapis.com/ai_assistant/tokens_used",
                "metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
                "value_type": ga_metric.MetricDescriptor.ValueType.INT64,
                "description": "Tokens used per interaction",
                "display_name": "Tokens Used",
                "unit": "1",
                "labels": [
                    ga_label.LabelDescriptor(
                        key="model",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    ),
                    ga_label.LabelDescriptor(
                        key="token_type",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    )
                ]
            },
            {
                "type": "custom.googleapis.com/ai_assistant/tool_execution_count",
                "metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
                "value_type": ga_metric.MetricDescriptor.ValueType.INT64,
                "description": "Tool execution count",
                "display_name": "Tool Execution Count",
                "unit": "1",
                "labels": [
                    ga_label.LabelDescriptor(
                        key="tool_name",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    ),
                    ga_label.LabelDescriptor(
                        key="status",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    )
                ]
            },
            {
                "type": "custom.googleapis.com/ai_assistant/estimated_cost",
                "metric_kind": ga_metric.MetricDescriptor.MetricKind.DELTA,
                "value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
                "description": "Estimated cost per operation (USD)",
                "display_name": "Estimated Cost",
                "unit": "USD",
                "labels": [
                    ga_label.LabelDescriptor(
                        key="service",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    ),
                    ga_label.LabelDescriptor(
                        key="operation",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    )
                ]
            },
            {
                "type": "custom.googleapis.com/ai_assistant/user_satisfaction",
                "metric_kind": ga_metric.MetricDescriptor.MetricKind.GAUGE,
                "value_type": ga_metric.MetricDescriptor.ValueType.DOUBLE,
                "description": "User satisfaction score (1-5)",
                "display_name": "User Satisfaction",
                "unit": "1",
                "labels": [
                    ga_label.LabelDescriptor(
                        key="user_id",
                        value_type=ga_label.LabelDescriptor.ValueType.STRING
                    )
                ]
            }
        ]

        for descriptor_config in descriptors:
            try:
                descriptor = ga_metric.MetricDescriptor(
                    type=descriptor_config["type"],
                    metric_kind=descriptor_config["metric_kind"],
                    value_type=descriptor_config["value_type"],
                    description=descriptor_config["description"],
                    display_name=descriptor_config["display_name"],
                    unit=descriptor_config.get("unit", "1"),
                    labels=descriptor_config["labels"]
                )

                self.client.create_metric_descriptor(
                    name=self.project_name,
                    metric_descriptor=descriptor
                )
            except Exception:
                pass

    def record_interaction_latency(
        self,
        latency_ms: float,
        model: str,
        interaction_type: str
    ):
        """記錄 AI 互動延遲"""
        self._write_time_series(
            "custom.googleapis.com/ai_assistant/interaction_latency",
            latency_ms,
            {"model": model, "interaction_type": interaction_type},
            metric_kind="GAUGE"
        )

    def record_tokens_used(self, tokens: int, model: str, token_type: str):
        """記錄 Token 使用量"""
        self._write_time_series(
            "custom.googleapis.com/ai_assistant/tokens_used",
            tokens,
            {"model": model, "token_type": token_type},
            metric_kind="DELTA"
        )

    def record_tool_execution(self, tool_name: str, status: str):
        """記錄工具執行"""
        self._write_time_series(
            "custom.googleapis.com/ai_assistant/tool_execution_count",
            1,
            {"tool_name": tool_name, "status": status},
            metric_kind="DELTA"
        )

    def record_estimated_cost(self, cost_usd: float, service: str, operation: str):
        """記錄估計成本"""
        self._write_time_series(
            "custom.googleapis.com/ai_assistant/estimated_cost",
            cost_usd,
            {"service": service, "operation": operation},
            metric_kind="DELTA"
        )

    def record_user_satisfaction(self, score: float, user_id: str):
        """記錄用戶滿意度"""
        self._write_time_series(
            "custom.googleapis.com/ai_assistant/user_satisfaction",
            score,
            {"user_id": user_id},
            metric_kind="GAUGE"
        )

    def _write_time_series(
        self,
        metric_type: str,
        value: float,
        labels: Dict[str, str],
        metric_kind: str = "GAUGE"
    ):
        """寫入時間序列數據"""
        try:
            series = monitoring_v3.TimeSeries()
            series.metric.type = metric_type
            series.resource.type = self.resource_type

            for key, val in self.resource_labels.items():
                series.resource.labels[key] = val

            for key, val in labels.items():
                series.metric.labels[key] = val

            now = time.time()
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)

            interval = monitoring_v3.TimeInterval()
            interval.end_time.seconds = seconds
            interval.end_time.nanos = nanos

            if metric_kind == "DELTA":
                interval.start_time.seconds = seconds - 60
                interval.start_time.nanos = nanos

            point = monitoring_v3.Point()
            point.interval.CopyFrom(interval)

            if isinstance(value, float):
                point.value.double_value = value
            else:
                point.value.int64_value = int(value)

            series.points = [point]

            self.client.create_time_series(
                name=self.project_name,
                time_series=[series]
            )
        except Exception as e:
            print(f"⚠️ Failed to write metric: {e}")

_metrics_client: Optional[MetricsClient] = None

def get_metrics_client(project_id: str) -> MetricsClient:
    """獲取全局指標客戶端"""
    global _metrics_client
    if _metrics_client is None:
        _metrics_client = MetricsClient(project_id)
    return _metrics_client


4) 分散式追蹤:含採樣率控制

OpenTelemetry 整合

# shared/tracing_config.py
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from typing import Optional
import os
import functools

class TracingConfig:
    """分散式追蹤配置"""

    def __init__(
        self,
        project_id: str,
        service_name: str = "ai-assistant",
        sample_rate: float = 0.1
    ):
        self.project_id = project_id
        self.service_name = service_name
        self.sample_rate = sample_rate
        self._setup_tracing()

    def _setup_tracing(self):
        """設定 Cloud Trace"""
        resource = Resource.create({
            "service.name": self.service_name,
            "service.version": "2.0.0"
        })

        sampler = ParentBased(root=TraceIdRatioBased(self.sample_rate))

        tracer_provider = TracerProvider(resource=resource, sampler=sampler)

        cloud_trace_exporter = CloudTraceSpanExporter(project_id=self.project_id)

        tracer_provider.add_span_processor(
            BatchSpanProcessor(cloud_trace_exporter)
        )

        trace.set_tracer_provider(tracer_provider)

    def instrument_fastapi(self, app):
        """為 FastAPI 添加追蹤"""
        FastAPIInstrumentor.instrument_app(app)

    def instrument_httpx(self):
        """為 HTTPX 添加追蹤"""
        HTTPXClientInstrumentor().instrument()

_tracer: Optional[trace.Tracer] = None

def get_tracer(project_id: str) -> trace.Tracer:
    """獲取全局追蹤器"""
    global _tracer
    if _tracer is None:
        sample_rate = float(os.getenv("TRACE_SAMPLE_RATE", "0.1"))
        config = TracingConfig(project_id, sample_rate=sample_rate)
        _tracer = trace.get_tracer(__name__)
    return _tracer

def trace_function(name: str):
    """函數追蹤裝飾器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            tracer = trace.get_tracer(__name__)
            with tracer.start_as_current_span(name):
                return await func(*args, **kwargs)
        return wrapper
    return decorator


5) 智慧告警系統:建立 Logs-based Metrics

先建立 Logs-based Metric

#!/bin/bash
# scripts/create-logs-based-metrics.sh

PROJECT_ID="your-project-id"

echo "📊 建立 Logs-based Metrics..."

# 錯誤計數
gcloud logging metrics create error_count \
    --project=$PROJECT_ID \
    --description="Error log count" \
    --log-filter='severity>=ERROR AND resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
    --value-extractor='EXTRACT(1)' \
    --metric-kind=DELTA \
    --value-type=INT64

echo "✅ Logs-based metrics 建立完成"

告警策略配置(Terraform)

# monitoring/alert-policies.tf
resource "google_monitoring_alert_policy" "high_latency" {
  display_name = "AI Assistant - High Latency"
  combiner     = "OR"

  conditions {
    display_name = "Latency > 3000ms"

    condition_threshold {
      filter          = "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\""
      comparison      = "COMPARISON_GT"
      threshold_value = 3000
      duration        = "300s"

      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_PERCENTILE_95"
      }
    }
  }

  notification_channels = [google_monitoring_notification_channel.email.name]

  alert_strategy {
    auto_close = "604800s"
  }
}

resource "google_monitoring_alert_policy" "high_error_rate" {
  display_name = "AI Assistant - High Error Rate"
  combiner     = "OR"

  conditions {
    display_name = "Error rate > 5%"

    condition_threshold {
      filter          = "metric.type=\"logging.googleapis.com/user/error_count\" resource.type=\"cloud_run_revision\""
      comparison      = "COMPARISON_GT"
      threshold_value = 0.05
      duration        = "180s"

      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }

  notification_channels = [google_monitoring_notification_channel.email.name]
}

resource "google_monitoring_notification_channel" "email" {
  display_name = "AI Assistant Alerts"
  type         = "email"

  labels = {
    email_address = "alerts@example.com"
  }
}


6) 監控儀表板配置

Dashboard JSON

{
  "displayName": "AI Assistant 生產監控",
  "mosaicLayout": {
    "columns": 12,
    "tiles": [
      {
        "width": 6,
        "height": 4,
        "widget": {
          "title": "🚀 請求延遲 (P50, P95, P99)",
          "xyChart": {
            "dataSets": [
              {
                "timeSeriesQuery": {
                  "timeSeriesFilter": {
                    "filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
                    "aggregation": {
                      "alignmentPeriod": "60s",
                      "perSeriesAligner": "ALIGN_PERCENTILE_50"
                    }
                  }
                },
                "plotType": "LINE"
              },
              {
                "timeSeriesQuery": {
                  "timeSeriesFilter": {
                    "filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
                    "aggregation": {
                      "alignmentPeriod": "60s",
                      "perSeriesAligner": "ALIGN_PERCENTILE_95"
                    }
                  }
                },
                "plotType": "LINE"
              },
              {
                "timeSeriesQuery": {
                  "timeSeriesFilter": {
                    "filter": "metric.type=\"custom.googleapis.com/ai_assistant/interaction_latency\" resource.type=\"cloud_run_revision\"",
                    "aggregation": {
                      "alignmentPeriod": "60s",
                      "perSeriesAligner": "ALIGN_PERCENTILE_99"
                    }
                  }
                },
                "plotType": "LINE"
              }
            ]
          }
        }
      },
      {
        "xPos": 6,
        "width": 6,
        "height": 4,
        "widget": {
          "title": "💰 每小時估計成本",
          "xyChart": {
            "dataSets": [
              {
                "timeSeriesQuery": {
                  "timeSeriesFilter": {
                    "filter": "metric.type=\"custom.googleapis.com/ai_assistant/estimated_cost\" resource.type=\"cloud_run_revision\"",
                    "aggregation": {
                      "alignmentPeriod": "3600s",
                      "perSeriesAligner": "ALIGN_RATE",
                      "crossSeriesReducer": "REDUCE_SUM",
                      "groupByFields": ["metric.label.service"]
                    }
                  }
                },
                "plotType": "STACKED_AREA"
              }
            ]
          }
        }
      },
      {
        "yPos": 4,
        "width": 4,
        "height": 4,
        "widget": {
          "title": "🔧 工具執行成功率",
          "scorecard": {
            "timeSeriesQuery": {
              "timeSeriesFilter": {
                "filter": "metric.type=\"custom.googleapis.com/ai_assistant/tool_execution_count\" metric.label.status=\"success\" resource.type=\"cloud_run_revision\"",
                "aggregation": {
                  "alignmentPeriod": "3600s",
                  "perSeriesAligner": "ALIGN_RATE"
                }
              }
            }
          }
        }
      },
      {
        "xPos": 4,
        "yPos": 4,
        "width": 4,
        "height": 4,
        "widget": {
          "title": "🪙 Token 使用量",
          "scorecard": {
            "timeSeriesQuery": {
              "timeSeriesFilter": {
                "filter": "metric.type=\"custom.googleapis.com/ai_assistant/tokens_used\" resource.type=\"cloud_run_revision\"",
                "aggregation": {
                  "alignmentPeriod": "3600s",
                  "perSeriesAligner": "ALIGN_RATE"
                }
              }
            }
          }
        }
      },
      {
        "xPos": 8,
        "yPos": 4,
        "width": 4,
        "height": 4,
        "widget": {
          "title": "⭐ 用戶滿意度",
          "scorecard": {
            "timeSeriesQuery": {
              "timeSeriesFilter": {
                "filter": "metric.type=\"custom.googleapis.com/ai_assistant/user_satisfaction\" resource.type=\"cloud_run_revision\"",
                "aggregation": {
                  "alignmentPeriod": "3600s",
                  "perSeriesAligner": "ALIGN_MEAN"
                }
              }
            },
            "gaugeView": {
              "lowerBound": 1.0,
              "upperBound": 5.0
            }
          }
        }
      }
    ]
  }
}


7) 成本分析:結合 Billing Export

BigQuery 日誌分析

# shared/cost_analyzer.py
from google.cloud import bigquery
from datetime import datetime, timedelta
from typing import Dict, List, Any

class CostAnalyzer:
    """成本分析器"""

    PRICING_REFERENCE = {
        "gemini-1.5-pro": {
            "input": 1.25 / 1_000_000,
            "output": 5.00 / 1_000_000,
        },
        "gemini-1.5-flash": {
            "input": 0.075 / 1_000_000,
            "output": 0.30 / 1_000_000,
        },
    }

    def __init__(self, project_id: str, dataset_id: str = "logging"):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)
        self.log_table = f"{project_id}.{dataset_id}.stdout"

    def analyze_daily_cost_estimate(self, date: str = None) -> Dict[str, Any]:
        """分析每日成本估算(來自應用層日誌)"""

        if date is None:
            date = datetime.now().strftime("%Y-%m-%d")

        query = f"""
        SELECT
          jsonPayload.service as service,
          jsonPayload.operation as operation,
          SUM(CAST(jsonPayload.estimated_cost_usd AS FLOAT64)) as total_cost,
          COUNT(*) as operation_count
        FROM
          `{self.log_table}`
        WHERE
          DATE(timestamp) = '{date}'
          AND jsonPayload.log_type = 'cost_tracking'
        GROUP BY
          service, operation
        ORDER BY
          total_cost DESC
        """

        try:
            results = self.bq_client.query(query).to_dataframe()
            return {
                "date": date,
                "total_cost_estimate": float(results["total_cost"].sum()) if len(results) > 0 else 0.0,
                "breakdown": results.to_dict("records"),
                "note": "這是估算值,真實成本請查 Cloud Billing Export"
            }
        except Exception as e:
            return {
                "date": date,
                "error": str(e),
                "total_cost_estimate": 0.0
            }

    def get_real_cost_from_billing(self, start_date: str, end_date: str) -> Dict[str, Any]:
        """從 Cloud Billing Export 獲取真實成本"""

        # 需要先設定 Billing Export 到 BigQuery
        billing_table = f"{self.project_id}.billing_export.gcp_billing_export_v1_XXXXXX"

        query = f"""
        SELECT
          service.description as service,
          SUM(cost) as total_cost,
          currency
        FROM
          `{billing_table}`
        WHERE
          DATE(_PARTITIONTIME) BETWEEN '{start_date}' AND '{end_date}'
          AND project.id = '{self.project_id}'
        GROUP BY
          service, currency
        ORDER BY
          total_cost DESC
        """

        try:
            results = self.bq_client.query(query).to_dataframe()
            return {
                "period": f"{start_date} to {end_date}",
                "total_cost": float(results["total_cost"].sum()),
                "breakdown": results.to_dict("records"),
                "source": "Cloud Billing Export (真實成本)"
            }
        except Exception as e:
            return {
                "error": str(e),
                "note": "請先設定 Cloud Billing Export"
            }

    def get_optimization_suggestions(self) -> List[Dict[str, Any]]:
        """獲取成本優化建議"""

        suggestions = [
            {
                "type": "model_selection",
                "priority": "high",
                "title": "智慧模型選擇",
                "description": "對簡單查詢使用 Gemini 1.5 Flash,可節省 94% 成本",
                "implementation": "在 chat handler 中根據查詢複雜度動態選擇模型"
            },
            {
                "type": "caching",
                "priority": "medium",
                "title": "啟用回應緩存",
                "description": "對重複查詢使用 Redis/Memorystore 緩存",
                "estimated_savings": "20-30%"
            },
            {
                "type": "prompt_optimization",
                "priority": "medium",
                "title": "優化 Prompt 長度",
                "description": "縮短 System Instruction,減少不必要的上下文",
                "estimated_savings": "10-15%"
            },
            {
                "type": "batch_processing",
                "priority": "low",
                "title": "批次處理",
                "description": "對非即時任務使用批次 API",
                "estimated_savings": "50% on batch workloads"
            }
        ]

        return suggestions


8) 一鍵部署完整監控系統

完整部署腳本

#!/bin/bash
# scripts/setup-full-observability.sh

set -e

PROJECT_ID="${1:-your-project-id}"
REGION="${2:-asia-east1}"
EMAIL="${3:-alerts@example.com}"

echo "🚀 部署完整可觀測性系統..."
echo "項目: $PROJECT_ID"
echo "區域: $REGION"
echo "告警郵箱: $EMAIL"
echo ""

# 1. 啟用 API
echo "📡 啟用必要的 API..."
gcloud services enable \
    logging.googleapis.com \
    monitoring.googleapis.com \
    cloudtrace.googleapis.com \
    bigquery.googleapis.com \
    --project=$PROJECT_ID

# 2. 建立 BigQuery 數據集
echo "📊 建立 BigQuery 數據集..."
bq mk --dataset \
    --location=US \
    --description="AI Assistant 日誌數據" \
    $PROJECT_ID:logging

# 3. 建立日誌 Sink
echo "📝 建立日誌 Sink..."
gcloud logging sinks create ai-assistant-logs \
    bigquery.googleapis.com/projects/$PROJECT_ID/datasets/logging \
    --log-filter='resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
    --project=$PROJECT_ID

# 4. 授予 Sink 權限
echo "🔐 設定 Sink 權限..."
SINK_SA=$(gcloud logging sinks describe ai-assistant-logs \
    --project=$PROJECT_ID \
    --format='value(writerIdentity)')

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="$SINK_SA" \
    --role="roles/bigquery.dataEditor"

# 5. 建立 Logs-based Metrics
echo "📊 建立 Logs-based Metrics..."
gcloud logging metrics create error_count \
    --project=$PROJECT_ID \
    --description="Error log count" \
    --log-filter='severity>=ERROR AND resource.type="cloud_run_revision" AND resource.labels.service_name="chat-service-enhanced"' \
    --value-extractor='EXTRACT(1)' \
    --metric-kind=DELTA \
    --value-type=INT64

# 6. 建立通知渠道
echo "📧 建立告警通知渠道..."
CHANNEL_ID=$(gcloud alpha monitoring channels create \
    --display-name="AI Assistant Alerts" \
    --type=email \
    --channel-labels=email_address=$EMAIL \
    --project=$PROJECT_ID \
    --format="value(name)")

echo "✅ 通知渠道: $CHANNEL_ID"

# 7. 部署監控儀表板
echo "📊 部署監控儀表板..."
sed "s/PROJECT_ID/$PROJECT_ID/g" monitoring/dashboard.json > /tmp/dashboard-final.json
gcloud monitoring dashboards create \
    --config-from-file=/tmp/dashboard-final.json \
    --project=$PROJECT_ID

# 8. 設定 Cloud Trace 採樣率
echo "🔍 配置 Cloud Trace..."
echo "TRACE_SAMPLE_RATE=0.1" >> .env.production

# 9. 輸出結果
echo ""
echo "✅ 可觀測性系統部署完成!"
echo ""
echo "📊 儀表板: https://console.cloud.google.com/monitoring/dashboards?project=$PROJECT_ID"
echo "🔍 日誌: https://console.cloud.google.com/logs?project=$PROJECT_ID"
echo "📈 追蹤: https://console.cloud.google.com/traces?project=$PROJECT_ID"
echo "💰 帳單: https://console.cloud.google.com/billing?project=$PROJECT_ID"
echo ""
echo "📝 下一步:"
echo "1. 設定 Cloud Billing Export 以追蹤真實成本"
echo "2. 根據實際流量調整告警閾值"
echo "3. 定期檢視成本優化建議"
echo ""


9) 生產環境檢查清單

✅ 上線前必檢項目

## 日誌系統
- [ ] StructuredLogHandler 已啟用(確認輸出到 jsonPayload)
- [ ] 敏感資訊已遮蔽(user_id hash、不記錄原始 prompt)
- [ ] 日誌保留期限已設定(建議 90 天)
- [ ] BigQuery Sink 已建立並測試寫入
- [ ] Trace ID 已注入日誌(可在 Console 串接 Trace)

## 指標系統
- [ ] 所有 Metric Descriptors 已建立
- [ ] DELTA/GAUGE 正確使用(延遲用 GAUGE,計數用 DELTA)
- [ ] MonitoredResource 根據環境動態設定
- [ ] 指標寫入錯誤不影響主流程
- [ ] P50/P95/P99 延遲追蹤正常

## 追蹤系統
- [ ] Cloud Trace 已啟用
- [ ] 採樣率已設定(生產環境建議 10%)
- [ ] 關鍵路徑已添加 Span
- [ ] FastAPI/HTTPX 自動 instrumentation 已啟用

## 告警系統
- [ ] Logs-based Metrics 已建立
- [ ] 高延遲告警已配置(P95 > 3s)
- [ ] 錯誤率告警已配置(> 5%)
- [ ] 成本超支告警已配置
- [ ] 通知渠道已測試(發送測試告警)
- [ ] On-call 輪值表已建立

## 成本控制
- [ ] 應用層成本估算日誌已啟用
- [ ] Cloud Billing Export 已設定
- [ ] 每週成本報告自動化
- [ ] 成本異常自動告警
- [ ] 成本優化建議定期審查

## 安全與合規
- [ ] 用戶 ID 從 JWT 安全提取(不依賴 header)
- [ ] PII/PHI 已遮蔽或 hash
- [ ] 審計日誌完整記錄
- [ ] 訪問權限最小化

## 性能優化
- [ ] 模型選擇策略已實施(簡單任務用 Flash)
- [ ] 回應緩存機制已啟用
- [ ] Prompt 長度已優化
- [ ] 批次處理已考慮


10) 監控黃金信號與 SLO

四個黃金信號

# 生產環境監控重點
GOLDEN_SIGNALS = {
    "latency": {
        "p50_target": 1000,  # ms
        "p95_target": 3000,  # ms
        "p99_target": 5000,  # ms
        "metric": "custom.googleapis.com/ai_assistant/interaction_latency"
    },
    "traffic": {
        "target_qps": 100,
        "metric": "run.googleapis.com/request_count"
    },
    "errors": {
        "target_rate": 0.01,  # 1%
        "metric": "logging.googleapis.com/user/error_count"
    },
    "saturation": {
        "cpu_target": 0.8,  # 80%
        "memory_target": 0.8,
        "metric": "run.googleapis.com/container/cpu/utilizations"
    }
}

# SLO 定義
SLO = {
    "availability": {
        "target": 0.999,  # 99.9%
        "window": "30d"
    },
    "latency": {
        "target": 0.95,  # 95% 請求 < 3s
        "threshold_ms": 3000,
        "window": "30d"
    },
    "error_rate": {
        "target": 0.99,  # 99% 請求成功
        "window": "30d"
    }
}



上一篇
Vertex AI Agent Builder 深度整合篇:打造會使用工具的 AI 助手
系列文
來都來了,那就做一個GCP從0到100的AI助理29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言